{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "98e0d4f3",
   "metadata": {},
   "source": [
    "# Scaling Many Model Training with Ray Tune\n",
    "\n",
    "| Template Specification | Description |\n",
    "| ---------------------- | ----------- |\n",
    "| Summary | This template demonstrates how to parallelize the training of hundreds of time-series forecasting models with [Ray Tune](https://docs.ray.io/en/latest/tune/index.html). The template uses the `statsforecast` library to fit models to partitions of the M4 forecasting competition dataset. |\n",
    "| Time to Run | Around 5 minutes to train all models. |\n",
    "| Minimum Compute Requirements | No hard requirements. The default is 8 nodes with 8 CPUs each. |\n",
    "| Cluster Environment | This template uses the latest Anyscale-provided Ray ML image using Python 3.9: [`anyscale/ray-ml:latest-py39-gpu`](https://docs.anyscale.com/reference/base-images/overview?utm_source=ray_docs&utm_medium=docs&utm_campaign=many_model_training_start_ipynb), with some extra requirements from `requirements.txt` installed on top. If you want to change to a different cluster environment, make sure that it's based on this image and includes all packages listed in the `requirements.txt` file. |\n",
    "\n",
    "The end result of the template is fitting multiple models on each dataset partition, then determining the best model based on cross-validation metrics. Then, using the best model, you can generate forecasts like the ones shown below:\n",
    "\n",
    "![Forecasts](https://github-production-user-asset-6210df.s3.amazonaws.com/3887863/239091118-2413f399-4636-40cf-8b12-8d3ce15f5ce1.png)\n",
    "\n",
    "\n",
    "In many model training, the focus is on training models on multiple subsets of\n",
    "a dataset, rather than training a single model on the entire dataset. Each model is trained on an independent\n",
    "dataset partition, allowing Ray to parallelize the workload by running multiple\n",
    "training jobs concurrently, instead of sequentially training each model.\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "08e65f8d",
   "metadata": {},
   "source": [
    "> Slot in your code below wherever you see the ✂️ icon to build off of this template!\n",
    ">\n",
    "> The framework and data format used in this template can be easily replaced to suit your own application!"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "52aa4f70",
   "metadata": {},
   "source": [
    "## Set up the dependencies\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "488cd257",
   "metadata": {},
   "source": [
    "When running in a distributed Ray Cluster, all nodes need to have access to dependencies.\n",
    "For this, we'll use `pip install --user` to install the necessary requirements. On an Anyscale Workspace, this is configured to install packages to a shared filesystem that will be available to all nodes in the cluster.\n",
    "\n",
    "```\n",
    "pip install --user -r requirements.txt\n",
    "```\n",
    "\n",
    "After installing all the requirements, we'll start with some imports."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b5ac5876",
   "metadata": {},
   "outputs": [],
   "source": [
    "import matplotlib.pyplot as plt\n",
    "import pandas as pd\n",
    "from statsforecast import StatsForecast\n",
    "from statsforecast.models import AutoARIMA, AutoETS, MSTL\n",
    "\n",
    "from ray import train, tune\n",
    "from ray.train import RunConfig\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "44a9dc54",
   "metadata": {},
   "source": [
    "## Define the custom training function\n",
    "\n",
    "Next, we define the custom training function that fits the forecasting models and\n",
    "computes evaluation metrics.\n",
    "Ray Tune will distribute this code across the cluster and schedule for as many training\n",
    "jobs as possible to execute in parallel, considering the available cluster resources."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "060ee3ce",
   "metadata": {},
   "source": [
    "> ✂️ Replace this with your own training logic to run per dataset partition.\n",
    ">\n",
    "> The only additional Ray Tune code that is added is the `train.report`\n",
    "> at the end of the training function. This reports metrics for Ray Tune to log,\n",
    "> which can be analyzed after the run finishes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "faaa0dad",
   "metadata": {},
   "outputs": [],
   "source": [
    "n_cv_windows = 1\n",
    "\n",
    "# Try two different types of forecasting models per dataset partition.\n",
    "# The dataset contains hourly records, so the `season_length` is 24 hours.\n",
    "models = [\n",
    "    AutoETS(season_length=24),\n",
    "    MSTL(season_length=24, trend_forecaster=AutoARIMA()),\n",
    "]\n",
    "\n",
    "# See the appendix for info on setting resource requirements for each trial.\n",
    "cpus_per_trial = len(models) * n_cv_windows\n",
    "\n",
    "\n",
    "def train_fn(config: dict):\n",
    "    # First, define some helper functions for fetching data and computing eval metrics.\n",
    "\n",
    "    def get_m5_partition(unique_id: str) -> pd.DataFrame:\n",
    "        df = pd.read_parquet(\n",
    "            \"https://datasets-nixtla.s3.amazonaws.com/m4-hourly.parquet\"\n",
    "        )\n",
    "        df = df[df[\"unique_id\"] == unique_id]\n",
    "        return df.dropna()\n",
    "\n",
    "    def evaluate_cross_validation(df: pd.DataFrame) -> pd.DataFrame:\n",
    "        from sklearn.metrics import mean_squared_error\n",
    "\n",
    "        models = df.drop(columns=[\"ds\", \"cutoff\", \"y\"]).columns.tolist()\n",
    "        evals = []\n",
    "        for model in models:\n",
    "            eval_ = (\n",
    "                df.groupby([\"unique_id\", \"cutoff\"])\n",
    "                # Calculate the Root Mean Squared Error (RMSE)\n",
    "                .apply(\n",
    "                    lambda x: mean_squared_error(\n",
    "                        x[\"y\"].values, x[model].values, squared=False\n",
    "                    )\n",
    "                ).to_frame()\n",
    "            )\n",
    "            eval_.columns = [model]\n",
    "            evals.append(eval_)\n",
    "        evals = pd.concat(evals, axis=1)\n",
    "        evals = evals.groupby([\"unique_id\"]).mean(numeric_only=True)\n",
    "        evals[\"best_model\"] = evals.idxmin(axis=1)\n",
    "        return evals\n",
    "\n",
    "    # Later, we will set up Ray Tune to populate `config['data_partition_id']`.\n",
    "    # Use this value to determine which partition of the dataset to use.\n",
    "    data_partition_id = config[\"data_partition_id\"]\n",
    "    train_df = get_m5_partition(data_partition_id)\n",
    "\n",
    "    forecast_horizon = 24  # Forecast the next 24 hours\n",
    "\n",
    "    sf = StatsForecast(\n",
    "        df=train_df,\n",
    "        models=models,\n",
    "        freq=\"H\",\n",
    "        # Set the number of cores used by statsforecast to the\n",
    "        # number of CPUs assigned to the trial!\n",
    "        n_jobs=cpus_per_trial,\n",
    "    )\n",
    "    cv_df = sf.cross_validation(\n",
    "        h=forecast_horizon,\n",
    "        step_size=forecast_horizon,\n",
    "        n_windows=n_cv_windows,\n",
    "    )\n",
    "\n",
    "    eval_df = evaluate_cross_validation(df=cv_df)\n",
    "    best_model = eval_df[\"best_model\"][data_partition_id]\n",
    "    forecast_mse = eval_df[best_model][data_partition_id]\n",
    "\n",
    "    if data_partition_id == \"H1\":\n",
    "        # For the first data partition, plot forecasts of the best model.\n",
    "        forecast_df = sf.forecast(h=forecast_horizon)\n",
    "        fig, ax = plt.subplots(1, 1, figsize=(10, 5))\n",
    "        plot_df = pd.concat([train_df, forecast_df]).set_index(\"ds\")\n",
    "        plot_df[[\"y\", best_model]].plot(ax=ax)\n",
    "        ax.set_title(f\"Forecast for data partition: {data_partition_id}\")\n",
    "        ax.set_xlabel(f\"Timestamp [ds]\")\n",
    "        ax.set_ylabel(f\"Target [y]\")\n",
    "        ax.get_figure().savefig(\"prediction.png\")\n",
    "\n",
    "    # Report the best-performing model and its corresponding eval metric.\n",
    "    train.report({\"forecast_mse\": forecast_mse, \"best_model\": best_model})\n",
    "\n",
    "\n",
    "trainable = tune.with_resources(train_fn, resources={\"CPU\": cpus_per_trial})\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "421eb6f6",
   "metadata": {},
   "source": [
    "## Define the data partitions to train on"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "89741e7a",
   "metadata": {},
   "source": [
    "In this template, we consider the dataset partition ID as a hyperparameter, and we leverage Ray Tune to parallelize the execution of our training function across each dataset partition.\n",
    "\n",
    "> ✂️ Modify the hyperparameter search space `param_space` to enable your training function to configure the dataset! This is how `config['data_partition_id']` from earlier gets populated."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1e9f2825",
   "metadata": {},
   "outputs": [],
   "source": [
    "# First, pull the list of unique IDs used to partition the dataset.\n",
    "data_partition_ids = list(\n",
    "    pd.read_parquet(\n",
    "        \"https://datasets-nixtla.s3.amazonaws.com/m4-hourly.parquet\",\n",
    "        columns=[\"unique_id\"],\n",
    "    )[\"unique_id\"].unique()\n",
    ")\n",
    "print(f\"Training on a total of {len(data_partition_ids)} dataset partitions.\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "21bccbcc",
   "metadata": {},
   "outputs": [],
   "source": [
    "param_space = {\n",
    "    \"data_partition_id\": tune.grid_search(data_partition_ids),\n",
    "}\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "13b4dd3e",
   "metadata": {},
   "source": [
    "Run many model training using Ray Tune!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b1ef8245",
   "metadata": {},
   "outputs": [],
   "source": [
    "tuner = tune.Tuner(\n",
    "    trainable,\n",
    "    param_space=param_space,\n",
    "    # Experiment results are saved to a shared filesystem available to all nodes.\n",
    "    run_config=RunConfig(storage_path=\"/mnt/cluster_storage\"),\n",
    ")\n",
    "result_grid = tuner.fit()\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "ba1a07d0",
   "metadata": {},
   "source": [
    "View the reported results of all trials as a dataframe."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d7baa29a",
   "metadata": {},
   "outputs": [],
   "source": [
    "results_df = result_grid.get_dataframe()\n",
    "results_df\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "5ed8df5c",
   "metadata": {},
   "source": [
    "## View one of the model forecasts\n",
    "\n",
    "We saved an image of the forecast generated by the best model trained on the first dataset partition `'H1'`.\n",
    "Let's find that file and display it!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3909636a",
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.display import Image, display\n",
    "import os\n",
    "\n",
    "for result in result_grid:\n",
    "    # Find the result associated with the run that saved a forecast plot.\n",
    "    if result.config[\"data_partition_id\"] == \"H1\":\n",
    "        display(Image(os.path.join(result.path, \"prediction.png\")))\n",
    "        break\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "0c67dfdb",
   "metadata": {},
   "source": [
    "## Summary\n",
    "\n",
    "This template is a quickstart to using [Ray Tune](https://docs.ray.io/en/latest/tune/index.html) for many model training. See [this blog post](https://www.anyscale.com/blog/training-one-million-machine-learning-models-in-record-time-with-ray) for more information on the benefits of performing many model training with Ray!\n",
    "\n",
    "At a high level, this template showed how to do the following:\n",
    "\n",
    "1. [Define the training function for a single partition of data.](https://docs.ray.io/en/latest/tune/tutorials/tune-run.html)\n",
    "2. [Define a Tune search space to run training over many partitions of data.](https://docs.ray.io/en/latest/tune/tutorials/tune-search-spaces.html)\n",
    "3. [Extract the best model per dataset partition from the Tune experiment output.](https://docs.ray.io/en/latest/tune/examples/tune_analyze_results.html)\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "df5fb149",
   "metadata": {},
   "source": [
    "### Appendix\n",
    "\n",
    "#### Specifying required resources\n",
    "\n",
    "`tune.with_resources` was used to specify the resources needed to launch one of our training jobs.\n",
    "Feel free to change this to the resources required by your application! You can also comment out the `tune.with_resources` block to assign `1 CPU` (the default) to each trial.\n",
    "\n",
    "Note that the number of CPUs to assign a trial is dependent on the workload.\n",
    "In this template, `statsforecast` has a `n_jobs` configuration that determines the number of CPU cores to use for performing the model fitting and cross-validation *within a trial*. So, we should set `n_jobs = cpus_per_trial`. We chose to set the parallelism equal to the total number of models that are fitted during cross-validation: `M model types * N temporal cross-validation windows = 2 * 1 = 2`.\n",
    "\n",
    "See [Ray Tune's guide on assigning resources](https://docs.ray.io/en/latest/tune/tutorials/tune-resources.html) for more information."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "dd48618e",
   "metadata": {},
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.13"
  },
  "vscode": {
   "interpreter": {
    "hash": "265d195fda5292fe8f69c6e37c435a5634a1ed3b6799724e66a975f68fa21517"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
